Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): expose rust writer as additional engine #1872

Conversation

ion-elgreco
Copy link
Collaborator

@ion-elgreco ion-elgreco commented Nov 16, 2023

Description

First version of a functional python binding to rust writer. Lots of work is happening around the writer. This one we should merge first: #1820.

A couple gaps will exist between current Rust writer and pyarrow writer. We will have to solve this in a later PR,:

  • Overwrite schema in Rust
  • Replacewhere (partition filter / predicate) overwrite

Related Issue(s)

@github-actions github-actions bot added binding/python Issues for the Python package binding/rust Issues for the Rust crate crate/core labels Nov 16, 2023
@ion-elgreco
Copy link
Collaborator Author

ion-elgreco commented Nov 16, 2023

@wjones127 @roeap Any early feedback on how to tackle the missing features and whether I am overseeing something would be much appreciated 😄.

Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly :).

Overall I believe this is going in the right direction. one thing that may be a larger piece of work is not collecting the iterator, but rather wrapping it in an execution plan. Maybe we could also live with collect for now to get the wiring right, and leave the execution plan to a follow-up.

configuration: Option<HashMap<String, Option<String>>>,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<()> {
let batches = data.0.map(|batch| batch.unwrap()).collect::<Vec<_>>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably try to not fully materialize the RBR into memory. This would entail wrapping the RBR in an ExecutionPlan, which can be passed to the CreateBuilder. Maybe DatasetExec from datafusion-python can provide some inspiration.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can do that in a second PR then, since I was also doing it like this for MERGE python binding. Then I can improve these two at the same time.

python/src/lib.rs Outdated Show resolved Hide resolved
@@ -108,6 +108,8 @@ pub struct WriteBuilder {
write_batch_size: Option<usize>,
/// RecordBatches to be written into the table
batches: Option<Vec<RecordBatch>>,
/// whether to overwrite the schema
overwrite_schema: bool,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this meant for schema evolution? If so, I'd recommend moving that to a follow-up PR as it would likely blow up this PR quite a bit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I think it's fine if we let that return NotImplementedError for now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was an quick attempt for schema evolution. I was able to write except it didn't write the columns that were not part of the original schema, so I need to dig through the code more.

Ok, let's do this as improvement in another update.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also would need to update all read path to always add null columns for columns non-existent in older parquet files. Haven't looked into it, but this would likely require some larger refactoring particularly in the datafusion DeltaScan. Saying this we likely need to validate that added columns are always nullable as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be schema evolution for also appends.

The PyArrow writer can do schema evolution but only combined with an overwrite mode.

I think that's purely a metadata action then. Would this be doable with the existing deltalake-core crate?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, we may end up with unreadble tables if we do this... if we replace the whole table this might work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With PyArrow it only works together with overwrite so it should be safe. Is there a way to adjust the commit that's written during a write?

@ion-elgreco ion-elgreco force-pushed the feat/expose_rust_writer_as_optional_engine branch from 2932609 to 4f8f359 Compare November 17, 2023 08:01
@ion-elgreco ion-elgreco force-pushed the feat/expose_rust_writer_as_optional_engine branch from dd0b4da to 9911574 Compare November 18, 2023 14:38
@ion-elgreco ion-elgreco marked this pull request as ready for review November 18, 2023 14:49
@ion-elgreco
Copy link
Collaborator Author

@roeap @wjones127 it's ready for a full review now. That one failing test will be solved when we merge this PR #1820, I've also updated the PR description, basically we have two gaps with pyarrow and rust writer.

@roeap
Copy link
Collaborator

roeap commented Nov 18, 2023

While looking through this I realized we may have a bug with the schema overwrite right now.

When we added this, overwrite would always replace the entire table. Since then we added selectively overwriting partitions, in which case I believe we may be corrupting the table, or at least make it unreadable to our readers, since we end up with parquet files with different schemas in the table.

Not sure if the pyarrow Dataset can handle that, but almost certain, our rust readers would not be able to handle that.

Should we include as an additional check, that no partition filters are supplied, if the schema is overwritten?

@ion-elgreco
Copy link
Collaborator Author

While looking through this I realized we may have a bug with the schema overwrite right now.

When we added this, overwrite would always replace the entire table. Since then we added selectively overwriting partitions, in which case I believe we may be corrupting the table, or at least make it unreadable to our readers, since we end up with parquet files with different schemas in the table.

Not sure if the pyarrow Dataset can handle that, but almost certain, our rust readers would not be able to handle that.

Should we include as an additional check, that no partition filters are supplied, if the schema is overwritten?

For the rust writer? Because currently only pyarrow is using them.

@roeap
Copy link
Collaborator

roeap commented Nov 18, 2023

For the rust writer? Because currently only pyarrow is using them.

Yes, and the pyarrow writer may be creating corrupted tables that we cannot read from rust and maybe even python if partition filters are supplied. and the schema is updated. Essentially we may need to handle very different data for some of the files we read, as we make no checks what the schema change looks like.

roeap and others added 8 commits November 20, 2023 21:41
Exposes added `convert to delta` functionality by @junjunjd to Python
API.

- closes delta-io#1767

---------

Co-authored-by: Robert Pack <[email protected]>
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
# Description
Implements benchmarks that are similar to Spark's Delta benchmarks.

Enable us to have a standard benchmark to measure improvements to merge
and some pieces can be factored out to build a framework for bench
marking delta workflows.
@ion-elgreco
Copy link
Collaborator Author

Closing this in favour of: #1891

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate crate/core
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add a Rust-backed engine for write_deltalake
4 participants